package com.storm.demo;

import com.google.common.base.Preconditions;
import org.apache.commons.cli.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.shade.org.joda.time.DateTime;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.util.*;

public abstract class BasicTopology {


    public static final String HASH_TAG = "hash_tag";
    public static final String TRACE = "trace";
    public static final String INPUT_MESSAGE = "input_message";
    public static final Fields HASH_FIELDS = new Fields(HASH_TAG);

    public static final String PROPS_PATH = "props_path";

    /**
     *默认的Worker数量
     */
    public static int DEFAULT_WORKERS = 1;

    /**
     *构造CMD对象
     *@param args  valueSeparator
     *@return
     *@throws ParseException
     */
    public CommandLine setupCommandLine(String[] args) throws ParseException {
        Options options = new Options();
        options.addOption("prosfile",true,"运行参数文件");
        Option property = Option.builder("D").argName("property=value").hasArg().numberOfArgs(2).valueSeparator().build();
        options.addOption(property);

        CommandLineParser parser = new DefaultParser();
        CommandLine cmd = parser.parse(options, args);
         return cmd;
    }

    /**
     *构造Props对象，用命令行配置覆盖Props
     *@param
     *@return
     *@throws IOException
     */
    public Properties setupProps(CommandLine cmd) throws IOException {
        String propsPath = cmd.getOptionValue("propsfile");
        Preconditions.checkArgument(StringUtils.isNoneEmpty(propsPath));

        Properties props = new Properties();
        props.setProperty(BasicTopology.PROPS_PATH,propsPath);
        try(InputStream inputStream = BeanRepository.class.getResourceAsStream(propsPath)){
            props.load(inputStream);
        }
        if(cmd.hasOption("D")){
            Properties properties = cmd.getOptionProperties("D");
            for(Map.Entry<Object,Object> entry : properties.entrySet()){
                props.setProperty((String ) entry.getKey(),(String) entry.getValue());
            }
        }
        //检查必须配置的几个参数
        Preconditions.checkArgument(StringUtils.isNoneEmpty(props.getProperty("topo.name")));
        return props;
    }

    /**
     *构造Storm的Config,topology.开头的属性被覆盖掉
     *@param props
     *@return
     */
    public Config setupConfig(Properties props){
        Config config = new Config();
        config.setFallBackOnJavaSerialization(false);
        config.setSkipMissingKryoRegistrations(false);

        config.registerSerialization(Date.class);
        config.registerSerialization(BigDecimal.class);
        config.registerSerialization(HashMap.class);
        config.registerSerialization(Map.class);
        config.registerSerialization(LinkedList.class);


        config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,180);
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2000);
        config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,16384);
        config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,16384);
        config.put(Config.TOPOLOGY_ACKER_EXECUTORS,config.get(Config.TOPOLOGY_WORKERS));
        config.registerMetricsConsumer(LoggingMetricsConsumer.class,2);

        Iterator<String> it = props.stringPropertyNames().iterator();
        while(it.hasNext()){
            String key = it.next();
            String value = props.getProperty(key);
            if(StringUtils.equalsAny(key,
                    Config.TOPOLOGY_WORKERS,
                    Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,
                    Config.TOPOLOGY_ACKER_EXECUTORS,
                    Config.TOPOLOGY_MAX_SPOUT_PENDING,
                    Config.TOPOLOGY_MAX_TASK_PARALLELISM
            )){
                config.put(key, NumberUtils.toInt(value));
            }else if(key.startsWith("topology.")){
                config.put(key,value);
            }
        }

        Integer workers = (Integer) config.get(Config.TOPOLOGY_WORKERS);
        if(workers == null || workers == 0){
            config.setNumWorkers(DEFAULT_WORKERS);
        }
        return config;
    }

    public KafkaSpout createKafkaSpout(Properties props){
        String topologyName = props.getProperty("topo.name");
        String topic = props.getProperty("kafka.topic");
        String brokerZkStr = props.getProperty("kafka.brokerZkStr");
        String brokerZkPath = props.getProperty("kafka.brokerZkPath");
        List<String> zkServers = Arrays.asList(StringUtils.split(props.getProperty("kafka.offset.zkServers"),","));
        Integer zkPort = Integer.parseInt(props.getProperty("kafka.offset.zkPort"));
        String zkRoot = props.getProperty("kafka.offset.zkRoot");
        String id = StringUtils.join(topologyName,"-",topic);

        BrokerHosts kafkaBrokerZk = new ZkHosts(brokerZkStr,brokerZkPath);
        SpoutConfig spoutConfig = new SpoutConfig(kafkaBrokerZk,topic,zkRoot,id);
        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = zkPort;
        spoutConfig.zkRoot = zkRoot;
        spoutConfig.stateUpdateIntervalMs = 30000;
        return new KafkaSpout(spoutConfig);
    }

    /**
     *本地测试运行
     */
    public void runLocal(String[] args)throws Exception{
        CommandLine cmd = setupCommandLine(args);
        Properties props = setupProps(cmd);
        Config config = setupConfig(props);

        config.setDebug(true);
        config.setNumWorkers(1);

        TopologyBuilder builder = createTopology(props);

        String topologyName = StringUtils.join(props.getProperty("topo.name"),"-", DateTime.now().toString("yyyyMMdd-HHmmss"));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(topologyName,config,builder.createTopology());
    }

    /**
     *服务器集群运行
     */
    public void run(String[] args)throws Exception{
        CommandLine cmd = setupCommandLine(args);
        Properties props = setupProps(cmd);
        Config config = setupConfig(props);
        TopologyBuilder builder = createTopology(props);

        String topologyName = StringUtils.join(props.getProperty("topo.name"),"-",DateTime.now().toString("yyyyMMdd-HHmmss"));
        StormSubmitter.submitTopologyWithProgressBar(topologyName,config,builder.createTopology());
    }

    protected abstract TopologyBuilder createTopology(Properties props);


}
